package elephant.rpc.network.netty;

import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import elephant.rpc.client.RPCClient;
import elephant.rpc.client.core.NetworkService;
import elephant.rpc.network.netty.codec.FastJsonMessageDecoder;
import elephant.rpc.network.netty.codec.FastJsonMessageEncoder;
import elephant.rpc.network.netty.codec.JDKSerialMessageDecoder;
import elephant.rpc.network.netty.codec.JDKSerialMessageEncoder;
import elephant.rpc.server.core.PerformMonitor;
import elephant.rpc.server.core.RPCChannel;
import elephant.rpc.server.session.RPCSession;
import elephant.rpc.threadpool.RPCThreadFactory;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.compression.ZlibWrapper;
import io.netty.handler.codec.serialization.ClassResolvers;

/**
 * 
 * @author skydu
 *
 */
public class NettyClientService extends NetworkService{
	//
	private static Logger logger=LoggerFactory.getLogger(NettyClientService.class);
	//
	private Bootstrap bootstrap;
	private EventLoopGroup group;
	private NettyClientHandler handler;
	private PerformMonitor performMonitor;
	//
	public NettyClientService(RPCClient rpcClient) {
		super(rpcClient);
		this.performMonitor=rpcClient.getPerformMonitor();
		handler=new NettyClientHandler(this);
	}

	
	@Override
	public void init() throws Exception {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" init");
		}
	}

	@Override
	public void start() throws Exception {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" start");
		}
		int cpuNum=Runtime.getRuntime().availableProcessors();
		group = new NioEventLoopGroup(1,Executors.newFixedThreadPool(cpuNum+1,
				new RPCThreadFactory("RPCClientNioEventWorker",
						client.getContextClassLoader())));
	    	bootstrap = new Bootstrap();
	    	bootstrap.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.SO_KEEPALIVE, true)
         .option(ChannelOption.TCP_NODELAY, true)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
            	 ChannelPipeline pipeline=ch.pipeline();
            	 //Enable stream compression (you can remove these two if unnecessary)
            	 pipeline.addLast(ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP));
            	 pipeline.addLast(ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP));
            	 //
            	 if(RPCClient.codecType.equals(RPCClient.CODEC_TYPE_JDK)){
	            	 if(logger.isDebugEnabled()){
						logger.debug("codec type jdk");
	            	 }
	            	 ch.pipeline().addLast(
	                		new JDKSerialMessageEncoder(), 
							new JDKSerialMessageDecoder(
							ClassResolvers.softCachingConcurrentResolver(
													Thread.currentThread().getContextClassLoader())),
													handler
	                         );
            	 }
            	 if(RPCClient.codecType.equals(RPCClient.CODEC_TYPE_FASTJSON)){
	            	 if(logger.isDebugEnabled()){
						logger.debug("codec type fastjson");
	            	 }
	                 ch.pipeline().addLast(
	                		 new FastJsonMessageEncoder(performMonitor),
	                		 new FastJsonMessageDecoder(performMonitor),
	                		 handler
	                         );
            	 }
             }
         });
	}

	@Override
	public void stop() throws Exception {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" stop");
		}
		try {
			group.shutdownGracefully();
		} catch (Exception e) {
			logger.error(e.getMessage(),e);
		}
	}

	//on RPCResource start
	@Override
	public synchronized void connectToServer(RPCSession session){
		String host=session.remoteHost;
		int port=session.remotePort;
		try {
			if(logger.isWarnEnabled()){
				logger.warn("connect to rpc server {}:{}",host,port);
			}
			Channel channel=bootstrap.connect(host, port).sync().channel();
			channel.attr(RPCSession.SESSION_KEY).set(session);
			RPCChannel rpcChannel=new NettyChannel(channel);
			session.channel=rpcChannel;
			//
			auth(session);
		} catch (Throwable e) {
			logger.warn("can not connect to server {}/{} msg:{}",host,port,e!=null?e.getMessage():"");
		}
	}
}
